package com.bizmda.bizsip.sink.processor;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.bizmda.bizsip.common.*;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import com.bizmda.bizsip.service.AppLogService;
import com.bizmda.bizsip.sink.api.SinkCircuitBreaker;
import com.bizmda.log.trace.MDCTraceUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

/**
 * @author shizhengye
 */
@Slf4j
public class SinkMessageListener implements MessageListener {
    private final AbstractSinkProcessor sinkProcessor;
    private final AppLogService appLogService;
    private final Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();
    private final SinkCircuitBreaker sinkCircuitBreaker;
    public SinkMessageListener(AbstractSinkConfig sinkConfig, RabbitTemplate rabbitTemplate, String rabbitmqLog) throws BizException {
        this.sinkCircuitBreaker = new SinkCircuitBreaker(sinkConfig);
        this.appLogService = new AppLogService(rabbitTemplate,rabbitmqLog);
        switch (sinkConfig.getProcessor()) {
            case AbstractSinkConfig.PROCESSOR_DEFAULT:
                this.sinkProcessor = new SinkProcessor(sinkConfig);
                break;
            case AbstractSinkConfig.PROCESSOR_BEAN:
                this.sinkProcessor = new BeanSinkProcessor(sinkConfig);
                break;
            case AbstractSinkConfig.PROCESSOR_SINK_BEAN:
                this.sinkProcessor = new SinkBeanSinkProcessor(sinkConfig);
                break;
            default:
                throw new BizException(BizResultEnum.SINK_TYPE_IS_ERROR);
        }
    }

    @Override
    public void onMessage(Message message) {
        MDCTraceUtils.putTraceId(message.getMessageProperties().getHeader(BizConstant.RABBITMQ_MESSAGE_HEADER_TRACE_ID));
        JSONObject outJsonObject;
        BizMessage<JSONObject> bizMessage = (BizMessage<JSONObject>) jackson2JsonMessageConverter.fromMessage(message);
        log.trace("Sink异步服务收到消息:\n{}", BizUtils.buildBizMessageLog(bizMessage));
        BizMessage<JSONObject> inMessage = BizTools.copyBizMessage(bizMessage);
        if (!(bizMessage.getData() instanceof JSONObject)) {
            bizMessage.setData(JSONUtil.parseObj(bizMessage.getData()));
        }
        try {
            BizTools.bizMessageThreadLocal.set(bizMessage);
            SinkCircuitBreaker.sinkCircuitBreakerThreadLocal.set(this.sinkCircuitBreaker);
            outJsonObject = this.sinkProcessor.process(bizMessage.getData());
            this.appLogService.sendSinkSuccessLog(inMessage,BizMessage.buildSuccessMessage(bizMessage,outJsonObject));
        } catch (BizException e) {
            log.warn("Sink服务执行出错:{}-{}\n{}", e.getCode(),e.getMessage(),e.getExtMessage());
            this.appLogService.sendSinkFailLog(inMessage,BizMessage.buildFailMessage(bizMessage,e));
        }
        finally {
            BizTools.bizMessageThreadLocal.remove();
            SinkCircuitBreaker.sinkCircuitBreakerThreadLocal.remove();
        }
    }
}
